#pragma once

#include <vector>
#include <iostream>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <pthread.h>
#include <string>
#include <unordered_map>
#include <memory>
#include <cassert>
#include <thread>
#include <mutex>
#include <signal.h>
#include <condition_variable>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/eventfd.h>
#include <sys/timerfd.h>
#include <arpa/inet.h>
#include <functional>
#include <regex>

#include "LOG.hpp"
#include "Any.hpp"

class Buffer
{
private:
    std::vector<char> _buffer;
    uint64_t _read_idx;
    uint64_t _write_idx;

public:
    Buffer()
        : _read_idx(0), _write_idx(0), _buffer(1024)
    {
    }
    ~Buffer()
    {
    }
    char *Begin() { return &*_buffer.begin(); }
    // 获取读地址
    char *ReadPosition() { return Begin() + _read_idx; }
    // 获取写地址
    char *WritePosition() { return Begin() + _write_idx; }
    // 获取前沿空闲大小
    uint64_t HeadIdleSize() { return _read_idx; }
    // 获取后沿空闲大小
    uint64_t TailIdleSize() { return _buffer.size() - _write_idx; }
    // 获取可读空间大小
    uint64_t ReadAbleSize() { return _write_idx - _read_idx; }
    // 将读偏移向后偏移
    void MoveReadOffSet(uint64_t len)
    {
        if (len == 0)
            return;
        assert(len <= ReadAbleSize());
        _read_idx += len;
    }
    // 将写偏移向后偏移
    void MoveWriteOffSet(uint64_t len)
    {
        assert(len <= TailIdleSize());
        _write_idx += len;
    }
    // 确保写入空间足够(若剩余空间大于len，就将数据移动到前面，若是剩余空间小于len，就扩容)
    void EnsureWriteSpace(uint64_t len)
    {
        // 末尾的空间足够
        if (len <= TailIdleSize())
        {
            return;
        }
        // 若是剩余的空间足够，就将数据向前偏移
        if (len <= TailIdleSize() + HeadIdleSize())
        {
            uint64_t rsz = ReadAbleSize();
            // 需要保存的数据就是从读偏移量的位置开始，长度为可读长度，然后复制到_buffer的begin中
            std::copy(ReadPosition(), ReadPosition() + rsz, Begin());
            _read_idx = 0;
            _write_idx = rsz;
        }
        // 若是剩余的空间都不够，就需要扩容
        else
        {
            DEBUG_LOG("resize : %d",_write_idx + len);
            _buffer.resize(_write_idx + len);
        }
    }
    // 写入数据(将inbuffer中的数据保存到Buffer中，长度为len)
    void Write(const char *inbuffer, uint64_t len)
    {
        if (len == 0)
            return;
        EnsureWriteSpace(len);
        const char *d = (const char *)inbuffer;
        std::copy(d, d + len, WritePosition());
    }
    void WriteAndPush(const char *inbuffer, uint64_t len)
    {
        Write(inbuffer, len);
        MoveWriteOffSet(len);
    }
    void WriteAsString(std::string &inbuffer)
    {
        Write(inbuffer.c_str(), inbuffer.size());
    }
    void WriteAsStringAndPush(std::string &inbuffer)
    {
        WriteAsString(inbuffer);
        MoveWriteOffSet(inbuffer.size());
    }

    void WriteAsBuffer(Buffer &inbuffer)
    {
        Write(inbuffer.ReadPosition(), inbuffer.ReadAbleSize());
    }

    void WriteAsBufferAndPush(Buffer &inbuffer)
    {
        WriteAsBuffer(inbuffer);
        MoveWriteOffSet(inbuffer.ReadAbleSize());
    }

    // 读取数据(将Buffer中长len的数据给到outbuffer中去)
    void Read(void *outbuffer, uint64_t len)
    {
        assert(len <= ReadAbleSize());
        std::copy(ReadPosition(), ReadPosition() + len, (char *)outbuffer);
    }
    void ReadAndPush(void *outbuffer, uint64_t len)
    {
        Read(outbuffer, len);
        MoveReadOffSet(len);
    }
    std::string ReadAsString(uint64_t len)
    {
        assert(len <= ReadAbleSize());
        std::string str;
        str.resize(len);
        Read(&str[0], len);
        return str;
    }
    std::string ReadAsStringAndPop(uint64_t len)
    {
        std::string str;
        str = ReadAsString(len);
        MoveReadOffSet(len);
        return str;
    }

    char *FindCRLF()
    {
        char *res = (char*)memchr(ReadPosition(), '\n', ReadAbleSize());
        return (char *)res;
    }

    std::string GetOneLine()
    {
        char *pos = FindCRLF();
        if (pos == NULL)
        {
            return "";
        }
        std::string str;
        str = ReadAsString(pos - ReadPosition() + 1);
        return str;
    }

    std::string GetOneLineAndPop()
    {
        std::string str = GetOneLine();
        MoveReadOffSet(str.size());
        return str;
    }

    // 清空数据
    void Clear()
    {
        _read_idx = 0;
        _write_idx = 0;
    }
};

#define MAX_BACKLOG 1024

class Socket
{
private:
    int _sockfd;

public:
    Socket() : _sockfd(-1)
    {
    }
    Socket(int fd) : _sockfd(fd)
    {
    }
    ~Socket()
    {
        Close();
    }
    int Fd()
    {
        return _sockfd;
    }
    // 创建套接字
    bool Create()
    {
        // socket(int domain,int type,int protocol)
        _sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (_sockfd < 0)
        {
            ERROR_LOG("SOCKET CEREATE FAILED!!");
            return false;
        }
        return true;
    }
    // 绑定套接字
    bool Bind(const std::string &ip, uint16_t port)
    {
        struct sockaddr_in addr;
        bzero(&addr, sizeof(addr));
        addr.sin_family = AF_INET;
        addr.sin_addr.s_addr = inet_addr(ip.c_str());
        addr.sin_port = htons(port);
        socklen_t len = sizeof(struct sockaddr_in);
        int ret = bind(_sockfd, (struct sockaddr *)&addr, len);
        if (ret < 0)
        {
            ERROR_LOG("SOCKET BIND FAILED!!");
            return false;
        }
        return true;
    }
    // 设置监听
    bool Listen(int backlog = MAX_BACKLOG)
    {
        int ret = listen(_sockfd, backlog);
        if (ret < 0)
        {
            ERROR_LOG("SOCKET LISTEN FAILLED!!");
            return false;
        }
        return true;
    }
    // 发起连接
    bool Connect(const std::string &ip, uint16_t port)
    {
        struct sockaddr_in addr;
        bzero(&addr, sizeof(addr));
        addr.sin_family = AF_INET;
        addr.sin_addr.s_addr = inet_addr(ip.c_str());
        addr.sin_port = htons(port);
        socklen_t len = sizeof(struct sockaddr_in);
        int ret = connect(_sockfd, (struct sockaddr *)&addr, len);
        if (ret < 0)
        {
            ERROR_LOG("SOCKET CONNECT FAILED!!");
            return false;
        }
        return true;
    }
    // 接收连接
    int Accept()
    {
        int newfd = accept(_sockfd, NULL, NULL);
        if (newfd < 0)
        {
            ERROR_LOG("SOCKET ACCEPT FAILED!!");
            return -1;
        }
        return newfd;
    }
    // 接收数据
    ssize_t Recv(void *buf, size_t len, int flag = 0)
    {
        ssize_t ret = recv(_sockfd, buf, len, flag);
        if (ret <= 0)
        { // EAGAIN 表示非阻塞状态下，缓冲区没有数据了
            // EINTR 表示接收的时候，外部出现错误，列入信号打断
            if (errno == EAGAIN || errno == EINTR)
            {
                return 0;
            }
            ERROR_LOG("SOCKET RECV FAILED!!");
            return -1;
        }
        return ret;
    }
    // 非阻塞接收数据
    ssize_t RecvNonBlock(void *buf, size_t len)
    {
        return Recv(buf, len, MSG_DONTWAIT);
    }

    // 发送数据
    ssize_t Send(const void *buf, size_t len, int flag = 0)
    {
        if (len == 0)
            return 0;
        ssize_t ret = send(_sockfd, buf, len, flag);
        if (ret < 0)
        {
            if (errno == EAGAIN || errno == EINTR)
            {
                return 0;
            }
            ERROR_LOG("SOCKET SEND FAILED!!");
            return -1;
        }
        return ret;
    }
    // 非阻塞发送数据
    ssize_t SendNonBlock(const void *buf, size_t len)
    {
        return Send(buf, len, MSG_DONTWAIT);
    }

    // 关闭socket
    void Close()
    {
        if (_sockfd != -1)
        {
            close(_sockfd);
            _sockfd = -1;
        }
    }
    // 创建一个服务器连接
    bool CreateSever(uint16_t port, const std::string &ip = "0.0.0.0", int block_flag = 0)
    {
        // 创建套接字
        if (!Create())
            return false;
        Reuse();
        // 绑定套接字
        if (!Bind(ip, port))
            return false;
        // 设置监听
        if (!Listen())
            return false;
        if (block_flag)
            SetNonBlock();
        return true;
        // 设置ip和端口重复使用
    }
    // 创建一个客户端连接
    bool CreateClient(uint16_t port, const std::string &ip, int block_flag = 0)
    {
        // 创建套接字
        if (!Create())
            return false;
        // 发起连接
        if (!Connect(ip, port))
            return false;
        // 设置非阻塞
        if (block_flag)
            SetNonBlock();
        return true;
    }
    // 设置端口地址复用
    bool Reuse()
    {
        int val = 1;
        int ret = setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int));
        if (ret < 0)
            return false;
        val = 1;
        ret = setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int));
        if (ret < 0)
            return false;
        return true;
    }
    // 设置非阻塞
    bool SetNonBlock()
    {
        int ret = fcntl(_sockfd, F_GETFL);
        if (ret < 0)
            return false;
        ret = fcntl(_sockfd, F_SETFL, ret | O_NONBLOCK);
        return true;
    }
};

class EventLoop;
class Epoller;
using EventCallBack = std::function<void()>;
class Channel
{
private:
    int _fd;
    EventLoop *_loop;
    // 说明对应文件描述符需要监控什么事件
    uint32_t _event;
    // 记录什么事件就绪了
    uint32_t _revent;
    // 读事件回调函数
    EventCallBack _ReadCb;
    // 写事件回调函数
    EventCallBack _WriteCb;
    // 错误事件回调函数
    EventCallBack _ErrorCb;
    // 对方关闭连接的回调函数
    EventCallBack _CloseCb;
    // 任意事件就绪的回调函数
    EventCallBack _EventCb;

public:
    Channel(EventLoop *loop, int fd)
        : _fd(fd), _event(0), _revent(0), _loop(loop)
    {
    }

    int Fd() { return _fd; }
    uint32_t Event() { return _event; }
    void SetRevent(uint32_t event) { _revent = event; };
    void SetReadcb(const EventCallBack &cb) { _ReadCb = cb; }
    void SetWritecb(const EventCallBack &cb) { _WriteCb = cb; }
    void SetErrorcb(const EventCallBack &cb) { _ErrorCb = cb; }
    void SetClosecb(const EventCallBack &cb) { _CloseCb = cb; }
    void SetEventcb(const EventCallBack &cb) { _EventCb = cb; }
    // 以下的几个函数只是对 _event 进行了操作，实际上还需要把它们放在EventLoop中去
    //  描述符是否监控可读
    bool ReadAble() { return _event & EPOLLIN; }
    //  描述符是否监控可写
    bool WriteAble() { return _event & EPOLLOUT; }
    //  对描述符监控可读
    void EnableRead()
    {
        // 或上才能成功的更新
        _event |= EPOLLIN;
        Update();
    }
    //  对描述符监控可写
    void EnableWrite()
    {
        _event |= EPOLLOUT;
        Update();
    }
    //  解除可读事件监控
    void DisableRead()
    {
        _event &= ~EPOLLIN;
        Update();
    }
    //  解除可写事件监控
    void DisableWrite()
    {
        _event &= ~EPOLLOUT;
        Update();
    }
    //  解除所有事件监控
    void DisableAll()
    {
        _event = 0;
        Update();
    }
    //  移除所有事件
    void Remove();
    void Update();
    // 一旦事件就绪了就调用这个函数
    void HandlerEvent()
    {
        // 若是读事件就绪，就调用读取回调函数
        // 如果是客户端关闭了连接（EPOLLRDHUP）触发也需要将数据读取掉
        // 若是客户端发送了紧急数据，也需要调用读取回调函数
        if (_revent & EPOLLIN || _revent & EPOLLRDHUP || _revent & EPOLLPRI)
        {
            // 任意事件就绪都需要调用这个函数
            if (_ReadCb)
                _ReadCb();
        }
        // 写，错误，和异常有可能会关闭连接，也有可能重复关闭，
        //  若是写事件就绪就调用写回调函数
        if (_revent & EPOLLOUT)
        {
            // 任意事件就绪都需要调用这个函数
            if (_WriteCb)
                _WriteCb();
        }
        // 若是发生错误，则调用错误回调函数
        else if (_revent & EPOLLERR)
        {

            if (_ErrorCb)
                _ErrorCb();
        }
        // 若是对方连接异常或者断开就调用关闭函数
        else if (_revent & EPOLLHUP)
        {

            if (_CloseCb)
                _CloseCb();
        }
        if (_EventCb)
            _EventCb();
    }
};

#define MAX_EPOLLEVENT 1024
class Epoller
{
private:
    // 指向底层的epoll红黑数的文件描述符
    int _epfd;
    // 保存所有就绪的事件
    struct epoll_event _events[MAX_EPOLLEVENT];
    // 保存所有活跃的事件
    std::unordered_map<int, Channel *> _channels;

private:
    void Update(Channel *channel, int op)
    {
        // int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
        int fd = channel->Fd();
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = channel->Event();
        int ret = epoll_ctl(_epfd, op, fd, &ev);
        if (ret < 0)
        {
            ERROR_LOG("EPOLL_CTL FAILED!");
        }
        return;
    }
    bool HasChannel(Channel *channel)
    {
        auto it = _channels.find(channel->Fd());
        if (it == _channels.end())
        {
            return false;
        }
        return true;
    }

public:
    Epoller()
    {
        _epfd = epoll_create(MAX_EPOLLEVENT);
        if (_epfd < 0)
        {
            ERROR_LOG("EPOLL_CREATE FAILED!");
            abort();
        }
    }
    // 如果fd不在_channels内，就添加，存在就更新
    void UpdateEpoll(Channel *channel)
    {
        bool ret = HasChannel(channel);
        // 若是该channel不存在 就添加
        if (ret == false)
        {
            // std::cout << "ADD" << std::endl;
            _channels.insert(std::make_pair(channel->Fd(), channel));
            return Update(channel, EPOLL_CTL_ADD);
        }
        // 存在就修改
        // std::cout << "MOD" << std::endl;
        Update(channel, EPOLL_CTL_MOD);
    }
    // 移除epoll中的fd
    void RemoveEpoll(Channel *channel)
    {
        auto it = _channels.find(channel->Fd());
        if (it != _channels.end())
        {
            _channels.erase(it);
        }
        Update(channel, EPOLL_CTL_DEL);
    }
    // 开始监控 返回一个活跃的数组
    void Poll(std::vector<Channel *> *active)
    {
        //  int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
        int ret = epoll_wait(_epfd, _events, MAX_EPOLLEVENT, -1);
        if (ret < 0)
        {
            if (errno == EINTR)
            {
                return;
            }
            ERROR_LOG("EPOLL_WAIT FAILE :%s\n", strerror(errno));
            abort();
        }
        for (int i = 0; i < ret; i++)
        {
            auto it = _channels.find(_events[i].data.fd);
            assert(it != _channels.end());
            it->second->SetRevent(_events[i].events);
            active->push_back(it->second);
        }
        return;
    }
};

using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimeTask
{
private:
    uint64_t _id;      // 任务id，用来识别不同任务
    uint32_t _timeout; // 任务的超时时间
    TaskFunc _cb;      // 析构时任务所需要执行的回调函数
    ReleaseFunc _release;
    bool _canceled; // 表明是否把任务完成，完成了就可以删除了
public:
    TimeTask(uint64_t id, uint32_t timeout, const TaskFunc &cb)
        : _id(id), _timeout(timeout), _cb(cb), _canceled(false)
    {
    }
    ~TimeTask()
    {
        if (_canceled == false)
            _cb();
        _release();
    }
    void SetRelease(const ReleaseFunc &cb)
    {
        _release = cb;
    }
    uint32_t Delay_Time()
    {
        return _timeout;
    }
    void Cancel()
    {
        _canceled = true;
    }
};
using PtrTask = std::shared_ptr<TimeTask>;
using WeakTask = std::weak_ptr<TimeTask>;
class TimerWheel
{
private:
    int _tick;                                      // 秒针，走到哪，释放到哪
    int _capacity;                                  // 该时间轮最大的延迟时间
    std::vector<std::vector<PtrTask>> _wheel;       // 时间轮，用来记录任务
    std::unordered_map<uint64_t, WeakTask> _timers; // 存储id对应的任务
    int _timerfd;                            // 一个秒针计时器
    EventLoop *_loop;                        // 用来防止线程安全问题出现
    std::unique_ptr<Channel> _timer_channel; // 给_timerfd设置可读函数

private:
    void Remove(uint64_t id) // 用来删除在_timers中记录的任务对象
    {
        auto it = _timers.find(id);
        if (it != _timers.end())
        {
            _timers.erase(it);
        }
    }
    static int CreateTimerfd()
    {
        int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); // 是相对于启动该Timerfd后的时间
        if (timerfd < 0)
        {
            ERROR_LOG("CREATE TIMERFD FAILE!");
            abort();
        }
        struct itimerspec time_set;
        time_set.it_value.tv_sec = 1;
        time_set.it_value.tv_nsec = 0;
        time_set.it_interval.tv_sec = 1;
        time_set.it_interval.tv_nsec = 0;
        timerfd_settime(timerfd, 0, &time_set, NULL);
        return timerfd;
    }
    void TimerAddInLoop(uint64_t id, uint32_t timeout, const TaskFunc &cb) // 添加任务到任务队列中去
    {
        PtrTask pt(new TimeTask(id, timeout, cb));
        pt->SetRelease(std::bind(&TimerWheel::Remove, this, id)); // 类内成员函数bind需要表明是哪个类成员函数
        int pos = (_tick + timeout) % _capacity;
        _wheel[pos].push_back(pt);
        _timers[id] = WeakTask(pt);
    }
    void TimerRefreshInLoop(uint64_t id) // 刷新对应的任务
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return;
        }
        PtrTask pt = it->second.lock();
        int timeout = pt->Delay_Time();
        int pos = (_tick + timeout) % _capacity;
        _wheel[pos].push_back(pt);
    }
    void TimerFdCancelInLoop(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return;
        }
        PtrTask pt = it->second.lock();
        if (pt)
            pt->Cancel();
        return;
    }
    int TimerFdRead() // 每隔一秒就会触发一次读事件
    {
        uint64_t times;
        int ret = read(_timerfd, &times, 8);
        if (ret < 0)
        {
            ERROR_LOG("TIMERFD READ FAILED!");
            abort();
        }
        return times;
    }
    void RunTimerTask()
    {
        _tick = (_tick + 1) % _capacity;
        _wheel[_tick].clear();
    }
    void OnTime()
    {
        int times = TimerFdRead();
        for (int i = 0; i < times; i++)
        {
            RunTimerTask();
        }
    }
public:
    TimerWheel(EventLoop *loop) : _tick(0), _capacity(60), _wheel(_capacity), _timerfd(CreateTimerfd()), _loop(loop), _timer_channel(new Channel(loop, _timerfd))
    {
        // 只需要读取timerfd看是否超时
        _timer_channel->SetReadcb(std::bind(&TimerWheel::OnTime, this));
        _timer_channel->EnableRead();
    }
    void TimerAdd(uint64_t id, uint32_t timeout, const TaskFunc &cb); // 添加任务到任务队列中去
    void TimerRefresh(uint64_t id);                                   // 刷新对应的任务
    void TimerCancel(uint64_t id);
    // 存在线程安全问题
    bool HasTask(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return false;
        }
        return true;
    }
};

class EventLoop
{
private:
    using Functor = std::function<void()>;
    int _event_fd;                     // 用于事件通知的文件描述符
    Epoller _poller;                   // 用于进行事件监控的poller
    std::unique_ptr<Channel> _channel; // 给_event_fd设置对应的回调函数
    std::vector<Functor> _task_queue;  // 任务池，该线程对应的任务都放在这里
    std::thread::id _id;               // 该EventLoop对应的线程id
    std::mutex _mutex;                 // 互斥锁，保证任务池的线程安全
    TimerWheel _timer_wheel;           // 一个定时器
private:
    void RunAllTask() // 执行所有任务
    {
        std::vector<Functor> functor;
        {
            // 限定作用域，保证_task_queue的线程安全
            std::unique_lock<std::mutex> lock(_mutex);
            _task_queue.swap(functor);
        }
        for (auto &t : functor)
        {
            t();
        }
        return;
    }
    // 创建一个event_fd
    static int Create_Eventfd()
    {
        int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
        if (efd < 0)
        {
            ERROR_LOG("CREATE EVENTFD FAILE!");
            abort();
        }
        return efd;
    }
    void Read_Event_Fd()
    {
        uint64_t out;
        int ret = read(_event_fd, &out, sizeof(out));
        if (ret < 0)
        {
            if (errno == EAGAIN || errno == EINTR)
            {
                return;
            }
            ERROR_LOG("EVENTFD READ FAILED");
            abort();
        }
        return;
    }
    void WakeUpEvent()
    {
        uint64_t in = 1;
        int ret = write(_event_fd, &in, sizeof(in));
        if (ret < 0)
        {
            if (errno == EINTR)
            {
                return;
            }
            ERROR_LOG("EVENTFD WRITE FAILED");
            abort();
        }
    }

public:
    EventLoop() : _id(std::this_thread::get_id()),
                  _event_fd(Create_Eventfd()),
                  _channel(new Channel(this, _event_fd)),
                  _timer_wheel(this)
    {
        // 给_event_fd 设置可读
        _channel->SetReadcb(std::bind(&EventLoop::Read_Event_Fd, this));
        _channel->EnableRead();
    }
    void Start() // 启动EventLoop:1：进行事件监控 2：事件就绪压入队列 3：执行任务
    {
        // 启动事件监控
        while (1)
        {
            std::vector<Channel *> actives;
            _poller.Poll(&actives);
            // 事件就绪压入队列
            for (auto &t : actives)
            {
                t->HandlerEvent();
            }
            RunAllTask();
        }
    }
    void RunInLoop(const Functor &cb) // 是对应的EventLoop的任务，就执行，不是就放入对应的任务池中
    {
        if (IsInLoop())
        {
            cb();
            return;
        }
        QueueInLoop(cb);
    }
    void QueueInLoop(const Functor &cb) // 将任务压入任务池中
    {
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _task_queue.push_back(cb);
        }
        // 防止阻塞在Poll中，需要手动给eventfd设置写入数据，来唤醒Poll
        WakeUpEvent();
    }
    bool IsInLoop() // 用来判断是否在对应的线程中
    {
        return _id == std::this_thread::get_id();
    }
    void UpdateEpoll(Channel *channel) // 修改或者添加事件监控
    {
        _poller.UpdateEpoll(channel);
    }
    void RemoveEpoll(Channel *channel) // 删除事件监控
    {
        _poller.RemoveEpoll(channel);
    }
    void AssertInLoop()
    {
        assert(_id == std::this_thread::get_id());
    }
    void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { _timer_wheel.TimerAdd(id, delay, cb); }
    void TimerRefresh(uint64_t id) { _timer_wheel.TimerRefresh(id); }
    void TimerCancel(uint64_t id) { _timer_wheel.TimerCancel(id); }
    bool IsInWheel(uint64_t id) { return _timer_wheel.HasTask(id); }
};
class LoopThread
{
private:
    // 互斥量和条件变量，保证获取对应的EventLoop时_loop已经实例化了
    // 没有实例化则wait
    std::mutex _mutex;
    std::condition_variable _cond;
    EventLoop *_loop;
    std::thread _thread;

private:
    // 实例化Loop对象，然后唤醒其他条件变量并且运行
    void EventLoopEntry()
    {
        EventLoop loop;
        {
            DEBUG_LOG("Thread: ");
            std::unique_lock<std::mutex> lock(_mutex);
            _loop = &loop;
            _cond.notify_all();
        }
        loop.Start();
    }

public:
    // 先创建线程，后创建Loop
    LoopThread() : _loop(NULL), _thread(std::thread(&LoopThread::EventLoopEntry, this))
    {
    }
    // 获取该线程对应的EventLoop
    EventLoop *GetEventLoop()
    {
        EventLoop *loop = NULL;
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _cond.wait(lock, [&]()
                       { return _loop != NULL; });
            loop = _loop;
        }
        return loop;
    }
};
class LoopThreadPool
{
private:
    int _thread_count;
    int _next_loop;        // 采用RR轮制来选择运行的Loop，这是指向的下标
    EventLoop *_base_loop; // 主线程运行的基础Loop，没有从属线程就让它来运行
    std::vector<LoopThread *> _threads;
    std::vector<EventLoop *> _loops;

public:
    LoopThreadPool(EventLoop *base_loop) : _base_loop(base_loop), _thread_count(0), _next_loop(0) {}
    void SetThreadCount(int count) { _thread_count = count; } // 设置从属线程的个数
    // 创建从属线程
    void Create()
    {
        if (_thread_count > 0)
        {
            _threads.resize(_thread_count);
            _loops.resize(_thread_count);
            for (int i = 0; i < _thread_count; i++)
            {
                _threads[i] = new LoopThread();
                _loops[i] = _threads[i]->GetEventLoop();
            }
        }
        return;
    }
    // 获取该线程对应的EventLoop
    EventLoop *GetEventLoop()
    {
        if (_thread_count == 0)
            return _base_loop;
        _next_loop = (_next_loop + 1) % _thread_count;
        return _loops[_next_loop];
    }
};
// 为了防止出现Connection对象已经被释放但是还是被使用的情况，所有使用shared_ptr来管理Connection对象
// 而外界对Connnection对象进行操作的时候都用过用过shared_ptr来管理
// 而用shared_ptr管理就需要设置Connection对象的状态，看是否已经释放或者是需要释放，是否已经创建还是刚创建，未设置协议的回调
// DISCONNECTED 表示已经释放了，需要关闭该对象 CONNECTING 表示刚创建，还未设置协议和回调
// CONNECTED 表示已经创建完毕，已经设置协议和回调 DISCONNECTING 表示需要销毁，不过需要提前看一下缓冲区是否还有数据
// debug
// class Test
// {
// private:
//     EventLoop *_loop;
// public:
//     Test(EventLoop *loop,uint64_t id)
//     {
//     }
// };

class Connection;
typedef enum
{
    DISCONNECTED,
    CONNECTING,
    CONNECTED,
    DISCONNECTING
} ConnecStatu;
typedef std::shared_ptr<Connection> PtrConnection;
class Connection : public enable_shared_from_this<Connection>
{
private:
    uint64_t _con_id;              // 一个唯一的字符，用来标示唯一的Connection对象
                                   // 同时也可以用做时间轮的id；
    int _sock_fd;                  // 对套接字进行控制的fd
    Buffer _in_buffer;             // 输入缓冲区
    Buffer _out_buffer;            // 输出缓冲区
    Any _context;                  // 一个Any对象，用来记录不同类型协议的上下文
    bool _enable_inactive_release; // 默认为false，用来确认是否启动了非活跃连接的销毁功能
    ConnecStatu _statu;
    EventLoop *_loop;
    Socket _socket;   // 管理socket
    Channel _channel; // 对套接字的各种操作
    // 这些回调并不是针对某一个事件(读写之类的事件)而是针对读事件后的业务处理
    using ConnectCallBack = std::function<void(const PtrConnection &)>;
    using CloseCallBack = std::function<void(const PtrConnection &)>;
    using MessageCallBack = std::function<void(const PtrConnection &, Buffer *)>;
    using AnyEventCallBack = std::function<void(const PtrConnection &)>;
    ConnectCallBack _connect;
    CloseCallBack _close;
    MessageCallBack _msg;
    AnyEventCallBack _any_event;

    CloseCallBack _sever_close; // 给Sever来关闭Connection对象的函数
private:
    void HandlerRead()
    {
        // 1.从socket中读取数据
        char buff[65536];
        ssize_t ret = _socket.RecvNonBlock(buff, 65535);
        // debug cout<<"Read ret = "<<ret<<endl;
        if (ret < 0)
        {
            // 小于0表示读取出现错误,但是不能直接关闭连接
            // 要先查看是否还有遗留的数据待处理
            return ShutdownInloop();
        }
        // 2.有数据就放入Buffer中，再调用message_callback来进行业务逻辑处理
        _in_buffer.WriteAndPush(buff, ret);
        if (_in_buffer.ReadAbleSize() > 0)
        {
            _msg(shared_from_this(), &_in_buffer);
        }
    }
    void HandlerWrite()
    {
        // 触发了写事件后
        // 将发送缓冲区的东西通过socket发送出去
        // DEBUG_LOG("outbuffer : %s",_out_buffer.ReadPosition());
        ssize_t ret = _socket.SendNonBlock(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());
        // DEBUG_LOG("ret = %ld",ret);

        // debug cout<<"Write ret = "<<ret<<endl;
        if (ret < 0)
        {
            // 若是写失败了，就说明对方连接关闭了
            // 但是在释放之前，还要查看接收缓冲区中是否还有数据
            if (_in_buffer.ReadAbleSize() > 0)
                _msg(shared_from_this(), &_in_buffer);
            return Release();
        }
        // 而若是没写失败,就查看是否是待关闭的状态,或者输出缓冲区是否还有数据
        _out_buffer.MoveReadOffSet(ret);
        if (_out_buffer.ReadAbleSize() == 0)
        {
            // 关闭写事件监控,防止重复写事件
            _channel.DisableWrite();
            if (_statu == DISCONNECTING)
            {
                // 若是待关闭状态
                return Release();
            }
        }
        return;
    }
    void HandlerClose()
    {
        if (_in_buffer.ReadAbleSize() > 0)
        {
            _msg(shared_from_this(), &_in_buffer);
        }
        Release();
    }
    void HandlerError() { HandlerClose(); }
    void HandlerAnyEvent()
    {
        if (_enable_inactive_release == true)
        {
            _loop->TimerRefresh(_con_id);
        }
        if (_any_event)
            _any_event(shared_from_this());
    }
    // 创建后用来设置对应的回调函数，设置读监控之类的操作
    void EstablishedInLoop()
    {
        // 1.设置连接状态
        assert(_statu == CONNECTING);
        _statu = CONNECTED;
        // 2.启动读监控
        // 这个操作应该在设置时间轮的定时销毁任务后进行
        // 若是在启动读监控之后再设置定时销毁任务，可能会出现未设置销毁任务时间轮就已经到了的情况
        // 从而导致野指针
        _channel.EnableRead();
        // 3.调用回调函数
        if (_connect != NULL)
        {
            if (shared_from_this() != NULL)
                _connect(shared_from_this());
        }
    }
    // 真正的释放
    void ReleaseInloop()
    {
        // 1.设置状态
        _statu = DISCONNECTED;
        // 2.移除事件监控
        _channel.Remove();
        // 3.关闭文件描述符
        _socket.Close();
        // 3.5 取消定时器中的销毁任务，防止野指针操作
        if (_loop->IsInWheel(_con_id))
            DisableInactiveReleaseInloop();
        // 4.调用用户设置的close函数
        // 先调用用户的然后再调用服务器的，能够防止connection对象被释放从而导致野指针
        if (_close)
            _close(shared_from_this());
        // 5.调用sever设置的close函数
        if (_sever_close)
            _sever_close(shared_from_this());
    }
    void SendInloop(Buffer& buf)
    {
        if (_statu == DISCONNECTED)
        {
            DEBUG_LOG("_statu == DISCONNECTED");
            return;
        }
        // 将数据放入发送缓冲区
        _out_buffer.WriteAsBufferAndPush(buf);
        // 启动写监控
        if (_channel.WriteAble() == false)
            _channel.EnableWrite();
    }
    void ShutdownInloop()
    {
        // 修改状态
        _statu = DISCONNECTING;
        // 判断发送缓冲区和接收缓冲区是否有数据待处理
        // 若是接收缓冲区还有东西，就调用业务处理回调函数
        if (_in_buffer.ReadAbleSize() > 0)
        {
            _msg(shared_from_this(), &_in_buffer);
        }
        // 若是发送缓冲区还有数据，就调用
        if (_out_buffer.ReadAbleSize() > 0)
        {
            if (_channel.WriteAble() == false)
                _channel.EnableWrite();
        }
        // 没有数据就直接释放
        if (_out_buffer.ReadAbleSize() == 0)
            Release();
    }
    // 启动非活跃连接
    void EnableInactiveReleaseInloop(uint32_t sec)
    {
        // 1.设置enable_inactive_release
        _enable_inactive_release = true;
        // 2.添加定时销毁任务
        if (_loop->IsInWheel(_con_id))
            return _loop->TimerRefresh(_con_id); // 若是已经添加了，就刷新
        // 否则就添加
        _loop->TimerAdd(_con_id, sec, std::bind(&Connection::Release, this));
    }
    // 关闭非活跃连接销毁
    void DisableInactiveReleaseInloop()
    {
        _enable_inactive_release = false;
        if (_loop->IsInWheel(_con_id))
            _loop->TimerCancel(_con_id);
    }
    void UpgradeInloop(const Any &context,
                       const ConnectCallBack &con,
                       const CloseCallBack &close,
                       const MessageCallBack &msg,
                       const AnyEventCallBack &anyevent) // 更换连接,需要修改上下文，以及各种回调
    {
        _context = context;
        _connect = con;
        _close = close;
        _msg = msg;
        _any_event = anyevent;
    }

public:
    Connection(EventLoop *loop, uint64_t con_id, int sock_fd)
        : _loop(loop), _con_id(con_id), _sock_fd(sock_fd), _channel(_loop, _sock_fd),
          _enable_inactive_release(false), _statu(CONNECTING), _socket(_sock_fd)
    {
        _channel.SetReadcb(std::bind(&Connection::HandlerRead, this));
        _channel.SetWritecb(std::bind(&Connection::HandlerWrite, this));
        _channel.SetErrorcb(std::bind(&Connection::HandlerError, this));
        _channel.SetClosecb(std::bind(&Connection::HandlerClose, this));
        _channel.SetEventcb(std::bind(&Connection::HandlerAnyEvent, this));
    }
    ~Connection() { DEBUG_LOG("RELEASE CONNECTION : %p", this); }
    uint64_t Id() { return _con_id; }
    int Fd() { return _sock_fd; }
    bool Connected() { return _statu == CONNECTED; } // 查看是否处于连接状态
    void SetContext(const Any &context)
    {
        _context = context;
    }
    // 返回上下文指针,方便外部进行修改
    Any *GetContext() { return &_context; }
    void Established() { _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this)); }
    void SetConnectCallBack(const ConnectCallBack &cb) { _connect = cb; }
    void SetCloseCallBack(const CloseCallBack &cb) { _close = cb; }
    void SetMessageCallBack(const MessageCallBack &cb) { _msg = cb; }
    void SetAnyEventCallBack(const AnyEventCallBack &cb) { _any_event = cb; }
    void SetSrvCloseCallBack(const CloseCallBack &cb) { _sever_close = cb; }
    void Send(const char *data, uint64_t len)
    {
        Buffer buf;
        buf.WriteAndPush(data,len);
        _loop->RunInLoop(std::bind(&Connection::SendInloop, this, buf));
    } // 并不是真正的发送数据，而是将数据放在输出缓冲区中
    void Shutdown()
    {
        _loop->RunInLoop(std::bind(&Connection::ShutdownInloop, this));
    } // 并不是真正的关闭连接，而是修改状态
    void Release()
    {
        _loop->QueueInLoop(std::bind(&Connection::ReleaseInloop,this));
    }
    void EnableInactiveRelease(int sec)
    {
        _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInloop, this, sec));
    } // 启动非活跃连接销毁,需要设置时间
    void DisableInactiveRelease()
    {
        _loop->RunInLoop(std::bind(&Connection::DisableInactiveReleaseInloop, this));
    } // 关闭非活跃连接销毁
    // 这个需要保证一旦调用就立马启动，因此需要保证在线程中被调用
    // 否则可能会出现协议还未更换，但是出现了新的读写任务，导致使用了原协议的函数从而出错
    void Upgrade(const Any &context, const ConnectCallBack &con, const CloseCallBack &close, const MessageCallBack &msg, const AnyEventCallBack &anyevent) // 更换连接,需要修改上下文，以及各种回调
    {
        _loop->AssertInLoop();
        _loop->RunInLoop(std::bind(&Connection::UpgradeInloop, this, context, con, close, msg, anyevent));
    }
};

class Acceptor
{
private:
    Socket _listen;
    EventLoop *_loop;
    Channel _channel;
    using AcceptorCallBack = std::function<void(int)>;
    AcceptorCallBack _acceptor_cb;

private:
    void HandlerRead()
    {
        int newfd = _listen.Accept();
        if (newfd < 0) // 小于0表示接收失败
            return;
        // 接收到后让Sever来进行后续的操作
        if (_acceptor_cb)
            _acceptor_cb(newfd);
    }
    int CreateSever(int port)
    {
        bool ret = _listen.CreateSever(port);
        assert(ret == true);
        return _listen.Fd();
    }

public:
    Acceptor(EventLoop *loop, int port)
        : _loop(loop), _listen(CreateSever(port)), _channel(loop, _listen.Fd())
    {
        _channel.SetReadcb(std::bind(&Acceptor::HandlerRead, this));
    }
    void SetAcceptorCallBack(AcceptorCallBack cb)
    {
        _acceptor_cb = cb;
    }
    void Listen()
    {
        _channel.EnableRead();
    }
};
using Functor = std::function<void()>;
class TCPSever
{
private:
    int _timeout;                  // 连接的同一时间，超过该时间未响应的就是非活跃连接
    bool _enable_inactive_release; // 设置是否启动非活跃连接销毁
    int _port;
    uint64_t _next_id;                                  // 一个自增长的id
    EventLoop _baseloop;                                // 主线程专门用来监听的基础对象
    Acceptor _acceptor;                                 // 一个用来进行监听连接的对象
    LoopThreadPool _loops;                              // 管理多个线程的线程池
    std::unordered_map<uint64_t, PtrConnection> _conns; // 管理多个Connection对象的

    // 给Connection对象设置的回调函数，由外部使用者设置
    using ConnectCallBack = std::function<void(const PtrConnection &)>;
    using CloseCallBack = std::function<void(const PtrConnection &)>;
    using MessageCallBack = std::function<void(const PtrConnection &, Buffer *)>;
    using AnyEventCallBack = std::function<void(const PtrConnection &)>;
    ConnectCallBack _connect;
    CloseCallBack _close;
    MessageCallBack _msg;
    AnyEventCallBack _any_event;

private:
    // 监听收到了一个新链接就需要调用该函数
    void NewConnection(int fd)
    {
        _next_id++;
        PtrConnection con(new Connection(_loops.GetEventLoop(), _next_id, fd));
        DEBUG_LOG("NEW CONNECTION %p",con.get());
        con->SetMessageCallBack(_msg);
        con->SetCloseCallBack(_close);
        con->SetConnectCallBack(_connect);
        con->SetAnyEventCallBack(_any_event);
        con->SetSrvCloseCallBack(std::bind(&TCPSever::RemoveFromCons, this, con));
        // 设置读监控和回调函数
        if (_enable_inactive_release == true)
        {
            con->EnableInactiveRelease(10);
        }
        con->Established();
        _conns.insert(std::make_pair(_next_id, con));
    }
    // 移除需要通过该函数将Connection从_cons中移除
    void RemoveFromConsInLoop(const PtrConnection &con)
    {
        DEBUG_LOG("RELEASE A CONNECTION :%p",con.get());
        uint64_t id = con->Id();
        auto it = _conns.find(id);
        if (it != _conns.end())
        {
            _conns.erase(it);
        }
    }
    void RemoveFromCons(const PtrConnection &con)
    {
        _baseloop.RunInLoop(std::bind(&TCPSever::RemoveFromConsInLoop, this, con));
    }
    void RunAfterInLoop(const Functor &task, int timeout)
    {
        _next_id++;
        _baseloop.TimerAdd(_next_id, timeout, task);
    }

public:
    TCPSever(int port) : _port(port),
                         _next_id(0),
                         _enable_inactive_release(false),
                         _acceptor(&_baseloop, _port),
                         _loops(&_baseloop)
    {
    }
    void SetThreadCount(int count)
    {
        _loops.SetThreadCount(count);
    } // 设置线程个数
    void SetConnectCallBack(const ConnectCallBack &cb) { _connect = cb; }
    void SetCloseCallBack(const CloseCallBack &cb) { _close = cb; }
    void SetMessageCallBack(const MessageCallBack &cb) { _msg = cb; }
    void SetAnyEventCallBack(const AnyEventCallBack &cb) { _any_event = cb; }
    // 启动非活跃连接销毁
    void EnableInactiveRelease(int timeout)
    {
        _timeout = timeout;
        _enable_inactive_release = true;
    }
    // 添加定时任务
    void RunAfter(const Functor &task, int timeout)
    {
        _baseloop.RunInLoop(std::bind(&TCPSever::RunAfterInLoop, this, task, timeout));
    }
    // 启动Sever
    void Start()
    {
        _loops.Create();
        _acceptor.SetAcceptorCallBack(std::bind(&TCPSever::NewConnection, this, std::placeholders::_1));
        _acceptor.Listen();
        _baseloop.Start();
    }
};

void Channel::Remove() { _loop->RemoveEpoll(this); }
void Channel::Update() { _loop->UpdateEpoll(this); }
void TimerWheel::TimerAdd(uint64_t id, uint32_t timeout, const TaskFunc &cb) // 添加任务到任务队列中去
{
    _loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, timeout, cb)); //
}
void TimerWheel::TimerRefresh(uint64_t id) // 刷新对应的任务
{
    _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void TimerWheel::TimerCancel(uint64_t id)
{
    _loop->RunInLoop(std::bind(&TimerWheel::TimerFdCancelInLoop, this, id));
}

class NetWork
{
public:
    NetWork()
    {
        DEBUG_LOG("NetWork INIT");
        signal(SIGPIPE, SIG_IGN);
    }
};
static NetWork nw;